package com.galeno.练习

import ch.hsr.geohash.GeoHash
import com.alibaba.fastjson.{JSON, JSONObject}
import com.galeno.utils.SparkUtil
import org.apache.spark.rdd.RDD

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/9/221:49
 */
object GeoHashDemo4 {
  def main(args: Array[String]): Unit = {
    val sc = SparkUtil.getSc
    //加载日志文件
    val logRdd: RDD[String] = sc.textFile("data/app_log_2021-06-05.log")

    //加载geoHash参考
    val geoRdd = sc.textFile("data/ref_geohash") map (s => {
      val arr = s.split(",")
      (arr(0), (arr(1), arr(2), arr(3)))
    })
    val gedRDD: RDD[(String, (String, String, String))] = geoRdd.groupByKey().mapValues(_.head)

    //都变为kv
    val logRDD: RDD[(String, JSONObject)] = logRdd.map(s => {
      val jSONObject = JSON.parseObject(s)
      val lat = jSONObject.getDouble("latitude")
      val lng = jSONObject.getDouble("longitude")
      val geohash = GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 6)
      (geohash, jSONObject)
    })

    //将量表join
    val res: RDD[String] = logRDD.leftOuterJoin(gedRDD).map(tp => {
      val pair: (JSONObject, Option[(String, String, String)]) = tp._2
      val jsonObject: JSONObject = tp._2._1
      // var Array(province,city,region)=Array("","","")
      val areaOption = pair._2
      var (province, city, region) = areaOption match {
        case Some((p, c, v)) => (p, c, v)
        case None => ("", "", "")
      }
      //添加到json
      jsonObject.put("province", province)
      jsonObject.put("city", city)
      jsonObject.put("region", region)
      jsonObject.toJSONString
    })

    res.saveAsTextFile("dataout/areaOut")




    Thread.sleep(Integer.MAX_VALUE)
  sc.stop()




  }

}
